feat: expand date/time expression support using codegen dispatcher#4417
feat: expand date/time expression support using codegen dispatcher#4417andygrove wants to merge 21 commits into
Conversation
…l short-circuit CometBatchKernelCodegen.defaultBody emitted this.col$ord.isNull(i) for every NullIntolerant input, but primitive Arrow vectors (timestamp / int / float / date / bool / ...) are wrapped in CometPlainVector at input-cast time and expose isNullAt rather than the raw Arrow isNull. The short-circuit therefore failed to compile for any primitive-typed column with a Janino "method isNull not declared" error. Share the existing nullCheckMethod helper between emitTypedGetters and defaultBody so both sites pick the right method name per column. Add a source test that pins the chosen method for TimeStampMicroTZVector inputs.
CometDateFormat keeps the native to_char path for UTC sessions with a format
literal in the strftime-mappable whitelist, and now routes every other case
through the Arrow-direct codegen dispatcher (CometScalaUDFCodegen) so that
non-UTC sessions, non-literal formats, and formats outside the whitelist
stay inside the Comet pipeline running Spark's own DateFormatClass.doGenCode.
Refactor: extract the closure-serialize + JvmScalarUdf-proto emission from
CometScalaUDF.convert into a reusable CometScalaUDF.emitJvmCodegenDispatch
helper. Any serde that wants to fall back to a Spark built-in expression
through the dispatcher can call it. Gated by COMET_SCALA_UDF_CODEGEN_ENABLED
so the default remains a clean Spark fallback for those cases until the
dispatcher graduates from experimental.
Reasoning notes:
- DateFormatClass already has a proper doGenCode (not CodegenFallback),
NullIntolerant, and ResolveTimeZone stamps the timeZoneId on it during
analysis. Closure-serializing the bound tree therefore reproduces
Spark-identical behavior for every timezone.
- The kernel cache key already encodes the literal format and timezone via
the serialized expression bytes, so (format, tz) combinations get
distinct cached kernels just like a bespoke (format, tz) -> formatter
cache would. Saves an entire DateFormatUDF.scala class.
Tests:
- date_format - timestamp_ntz input: now runs checkSparkAnswerAndOperator
for every timezone under the codegen flag instead of falling back for
non-UTC.
- Split each previous "falls back to Spark" Scala test into two: one
asserting the codegen-on path stays in Comet, one asserting the
codegen-off path falls back with the dispatcher flag as the reason.
- date_format.sql now pins a non-UTC session timezone and enables the
codegen flag at file scope; all queries are plain query and assert
in-Comet execution.
The CometScalaUDF fallback message was generalized from 'ScalaUDF has no native path' to 'expression has no native path' when the dispatcher helper was extracted for reuse by CometDateFormat.
- Drop the getCompatibleNotes override on CometCodegenDispatch. The docs generator emits compat notes under a heading promising 'no additional configuration', which contradicts a note describing the dispatcher flag. Keep getSupportLevel=Compatible and surface the flag dependency via withInfo / EXPLAIN instead. - Add a sentinel non-error query to each *_ansi.sql fixture. The expect_error semantics pass vacuously when the dispatcher silently falls back to Spark (both paths throw identical exceptions); the sentinel uses checkSparkAnswerAndOperator and fails if Comet did not run the expression natively. - Pin spark.sql.legacy.timeParserPolicy=CORRECTED in to_unix_timestamp_ansi.sql so the JDK java.time formatter is exercised regardless of runtime default; LEGACY policy uses SimpleDateFormat with a different exception class. - Annotate the three ANSI fixtures with MinSparkVersion: 3.5 since the DATETIME_FIELD_OUT_OF_BOUNDS and CANNOT_PARSE_TIMESTAMP error classes were standardized in Spark 3.5. Spark 3.4 coverage is delivered separately.
Mirror the existing MinSparkVersion gate with a MaxSparkVersion gate so SQL fixtures can pair a 3.5+ variant (using post-3.5 error class names) with a 3.4 variant (using the pre-classification JDK java.time exception text). The make_timestamp and to_unix_timestamp ANSI exception paths produce different exception wording on Spark 3.4 versus 3.5+; before this commit only the 3.5+ side had coverage and 3.4 ANSI behavior went untested. Framework: - SqlTestFile gains maxSparkVersion: Option[String]. - SqlFileTestParser recognizes -- MaxSparkVersion: lines. - CometSqlFileTestSuite gains meetsMaxSparkVersion / skipReason helpers; the skip-and-log path now reports whether the constraint was a floor or ceiling. Coverage: - make_timestamp_ansi_spark34.sql: MaxSparkVersion: 3.4, expect_error patterns target the JDK DateTimeException field-name text (MonthOfYear, Invalid date, HourOfDay) which is stable in 3.4's pre-classification error path. - to_unix_timestamp_ansi_spark34.sql: MaxSparkVersion: 3.4, expect_error pattern targets the JDK DateTimeParseException 'could not be parsed' wording.
a3923d4 to
1b952e7
Compare
CI failed on Spark 3.5.8 because the executor-thrown SparkDateTimeException's
getMessage() does NOT preserve the driver-formatted '[DATETIME_FIELD_OUT_OF_BOUNDS]'
error-class prefix; only the inner JDK message ('Invalid value for MonthOfYear ...',
'Invalid date FEBRUARY 30', 'Invalid value for HourOfDay ...') survives the
'Job aborted ... Lost task ... SparkDateTimeException: <inner>' wrapping that
shows up in the test's caught exception.
Switching to the JDK java.time field-name substrings (MonthOfYear, Invalid date,
HourOfDay) makes the assertions stable across Spark 3.4, 3.5.x, and 4.x without
needing a MinSparkVersion gate, so the make_timestamp_ansi_spark34.sql variant
becomes redundant and is deleted in the same commit.
Verified locally: passes under -Pspark-3.4 (3.4.3) and -Pspark-3.5 (3.5.8).
|
Awesome to see the codegen framework being put to good use, and bugs being found and fixed! A couple of things on the broader dispatcher pattern worth thinking about before the second wave of expressions lands. The sentinel On the cache-key side, three of the new expressions are |
Which issue does this PR close?
Part of #3202. Supersedes and consolidates #4373.
Expressions covered
Ten Spark date/time expressions are routed through the Arrow-direct codegen dispatcher in this PR. All run inside the Comet pipeline (no operator-level Spark fallback) when
spark.comet.exec.scalaUDF.codegen.enabled=trueis set. Default behavior is unchanged when the flag is off.date_format(originally feat: route date_format through codegen dispatcher for non-native cases #4373; nativeto_charpath retained for UTC + whitelisted-format cases, dispatcher path for everything else)add_monthsmonths_betweenmake_timestamptimestamp_millis(MillisToTimestamp)timestamp_micros(MicrosToTimestamp)unix_secondsunix_millisunix_microsto_unix_timestampRationale for this change
Comet's plan rules fall back to Spark for any expression that lacks a native serde, breaking up the Comet pipeline at the operator boundary. The Arrow-direct codegen dispatcher already in tree (
CometScalaUDFCodegen, behindspark.comet.exec.scalaUDF.codegen.enabled) closure-serializes a bound Catalyst expression, ships it through aJvmScalarUdfproto, and Janino-compiles Spark's owndoGenCodeinto a per-batch kernel that reads Arrow vectors and writes an Arrow output vector directly. For any expression whosedoGenCodeis real (notCodegenFallback) and whose input/output types fitCometBatchKernelCodegen.isSupportedDataType, routing through this dispatcher reproduces Spark behavior exactly without a bespoke UDF class.This PR establishes a reusable helper for that routing pattern and applies it to the ten expressions above in two waves:
DateFormatClassfirst (originally #4373, folded in here), then nine currently-unsupported expressions that share the same shape.What changes are included in this PR?
fix(codegen)— Pre-existing bug surfaced while wiringdate_format.CometBatchKernelCodegen.defaultBodyemittedthis.col$ord.isNull(i)for everyNullIntolerantinput, but primitive Arrow vectors are wrapped inCometPlainVectorat input-cast time and exposeisNullAt, notisNull. Janino rejected the kernel with "methodisNullnot declared".emitTypedGettersalready knew the right method name vianullCheckMethod; the fix exposes that helper sodefaultBodypicks the same name per column ordinal. New source test pins the chosen method forTimeStampMicroTZVectorso the regression can't recur.feat— dispatcher helper extraction — Extract the closure-serialize +JvmScalarUdfemission fromCometScalaUDF.convertintoCometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding)so other serdes can reuse the path.CometDateFormat.convertkeeps the nativeto_charpath for UTC + whitelisted-format cases and now calls the helper for everything else. Gated byspark.comet.exec.scalaUDF.codegen.enabled(default false, experimental); default behavior is unchanged.feat—CometCodegenDispatch[T]base class — A one-class wrapper aroundemitJvmCodegenDispatchlets any expression with a realdoGenCodeslot in with a single-lineobjectdeclaration. The helper marks the expressionCompatible()because the dispatcher runs Spark's owndoGenCodeinside the kernel; behavior matches Spark exactly when the flag is on, and the operator falls back cleanly when it is off.feat— nine new datetime serdes — Each of the nine new expressions becomes a one-line singletonextends CometCodegenDispatch[T]and is registered intemporalExpressions. Three are ANSI-sensitive (MakeTimestamp,MillisToTimestamp,ToUnixTimestampcarryfailOnError); the dispatcher inherits the throw site from Spark's owndoGenCode, so exception semantics propagate without any serde-level branching.test—MaxSparkVersionannotation — Mirror the existingMinSparkVersionparser gate with aMaxSparkVersionceiling. TheCometSqlFileTestSuiteskip logic now reports whether a constraint was a floor or ceiling. This lets fixtures pair a 3.5+ variant against a 3.4 variant where the expected error class wording differs.Interval-producing expressions (
MakeInterval/MakeYMInterval/MakeDTInterval) are explicitly out of scope: the dispatcher'sisSupportedDataTypedoes not include Spark's interval types. Version-conditional expressions (TimestampAdd/TimestampDiff3.4+,DayName3.5+,MonthName4.0+) are deferred to a follow-on so this PR avoids touching theCometExprShimfiles.Scaffolded with the
superpowers:brainstormingandsuperpowers:writing-plansskills.How are these changes tested?
CometTemporalExpressionSuitedate_formattests: 10/10 pass. Three "falls back to Spark" tests are paired with a "routes via codegen dispatcher" sibling that enables the flag and asserts in-Comet execution.date_format - timestamp_ntz inputrunscheckSparkAnswerAndOperatorfor every timezone under the codegen flag.CometSqlFileTestSuite: nine new per-expression SQL fixtures (add_months.sql,months_between.sql,make_timestamp.sql,timestamp_millis.sql,timestamp_micros.sql,unix_seconds.sql,unix_millis.sql,unix_micros.sql,to_unix_timestamp.sql) pin a non-UTC session timezone and the codegen flag at file scope.date_format.sqlfrom feat: route date_format through codegen dispatcher for non-native cases #4373 is included.make_timestamp_ansi.sql/make_timestamp_ansi_spark34.sql,to_unix_timestamp_ansi.sql/to_unix_timestamp_ansi_spark34.sql, plustimestamp_millis_ansi.sql. The Spark 3.5+ files match theDATETIME_FIELD_OUT_OF_BOUNDS/CANNOT_PARSE_TIMESTAMPerror classes; the 3.4 files match the underlying JDKjava.timeexception text (MonthOfYear,Invalid date,HourOfDay,could not be parsed). Each ANSI file includes a sentinel non-errorquerythat usescheckSparkAnswerAndOperatorso a silent dispatcher fallback would fail the file (theexpect_errorpath alone passes vacuously because Spark fallback also throws).CometCodegenSourceSuite: includes the newNullIntolerant short-circuit uses isNullAt for CometPlainVector-wrapped columnsregression test plus a parameterizedBucket 4 datetime expressions produce non-empty generated kernel sourcetest covering all nine new expressions.CometCodegenSuite: no regressions in the dispatcher's existing surface.-Pspark-3.4: 284/284 SQL fixtures pass (3.5+ variants correctly skip).